home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
util
/
introspect.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
31KB
|
986 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
from __future__ import with_statement
import sys
import time
import threading
import string
import logging
import re
import keyword
import types
import inspect
import primitives
from weakref import ref
from types import GeneratorType
from path import path
from collections import defaultdict
from traceback import print_exc
import warnings
log = logging.getLogger('util.introspect')
oldvars = vars
def uncollectable(clz):
import gc as gc
gc.collect()
return _[1]
class debug_property(object):
def __init__(self, fget = None, fset = None, fdel = None, doc = None):
self._debug_property__get = fget
self._debug_property__set = fset
self._debug_property__del = fdel
self.__doc__ = doc
def __get__(self, inst, type = None):
if inst is None:
return self
if self._debug_property__get is None:
raise AttributeError, 'unreadable attribute'
try:
return self._debug_property__get(inst)
except AttributeError:
e = None
print_exc()
raise AssertionError('attribute error during __get__')
def __set__(self, inst, value):
if self._debug_property__set is None:
raise AttributeError, "can't set attribute"
try:
return self._debug_property__set(inst, value)
except AttributeError:
e = None
print_exc()
raise AssertionError('attribute error during __set__')
def __delete__(self, inst):
if self._debug_property__del is None:
raise AttributeError, "can't delete attribute"
try:
return self._debug_property__del(inst)
except AttributeError:
e = None
print_exc()
raise AssertionErrror('attribute error during __del__')
def vars(obj = None):
res = { }
if hasattr(obj, '__dict__'):
return oldvars(obj)
elif hasattr(obj, '__slots__'):
return (dict,)((lambda .0: for attr in .0:
(attr, getattr(obj, attr, sentinel)))(obj.__slots__))
elif hasattr(obj, 'keys'):
return obj
else:
return dict((lambda .0: for x in .0:
(x, sentinel))(obj))
version_23 = sys.version_info < (2, 4)
def this_list():
d = inspect.currentframe(1).f_locals
nestlevel = 1
while '_[%d]' % nestlevel in d:
nestlevel += 1
result = d['_[%d]' % (nestlevel - 1)]
if version_23:
return result.__self__
else:
return result
def cannotcompile(f):
return f
def stack_trace(level = 1, frame = None):
try:
if frame is None:
f = sys._getframe(level)
else:
f = frame
frames = []
while f is not None:
c = f.f_code
frames.insert(0, (c.co_filename, c.co_firstlineno, c.co_name))
f = f.f_back
return frames
finally:
del f
del frame
def print_stack_trace(frame = None):
trace = stack_trace(2, frame)
for frame in trace:
print ' File "%s", line %d, in %s' % frame
def print_stack_traces():
frames = sys._current_frames().items()
for id, frame in frames:
print 'Frame %d:' % id
print_stack_trace(frame)
def is_all(seq, my_types = None):
if not seq:
try:
iter(my_types)
except:
t = my_types
else:
t = my_types[0]
finally:
return (True, t)
if type(my_types) == type:
my_types = [
my_types]
if my_types == None:
my_types = [
type(seq[0])]
all = True
for elem in seq:
if type(elem) not in my_types:
all = False
break
continue
if all:
return (all, my_types[0])
else:
return (all, None)
def shift(l, varnames):
vars = varnames.split()
n = len(vars) - 1
l = l[:n] + [
l[n:]]
caller = sys._getframe().f_back
new_locals = dict(zip(vars, l))
for var, val in new_locals.items():
caller.f_locals[var] = val
print caller.f_locals
def get_func_name(level = 1):
return sys._getframe(level).f_code.co_name
def get_func(obj, command, *params):
try:
func = getattr(obj, command.lower())
log.debug('Finding %s.%s%s', obj.__class__.__name__, command.lower(), params)
except AttributeError:
obj.__class__.__name__(command.lower(), ', '.join, [], []([ repr(x) for x in params ]))
func = None
except:
'%s has no function to handle: %s(%s)'
return func
def decormethod(decorator):
def wrapper(method):
return (lambda self: decorator(self, method, *args, **kw))
return wrapper
def funcToMethod(func, clas, method_name = None):
func.im_class = clas
func.im_func = func
func.im_self = None
if not method_name:
method_name = func.__name__
clas.__dict__[method_name] = func
def attach_method(obj, func, name = None):
if not name:
pass
name = func.__name__
cls = obj.__class__
cls.temp_foo = func
obj.__setattr__(name, cls.temp_foo)
del cls.temp_foo
def isgeneratormethod(object):
return isinstance(getattr(object, '__self__', None), GeneratorType)
CO_VARARGS = 4
CO_VARKEYWORDS = 8
def callany(func, *args):
if not callable(func):
raise TypeError, "callany's first argument must be callable"
CallLater = CallLater
CallLaterDelegate = CallLaterDelegate
CallbackSequence = CallbackSequence
import util.callbacks
c = func
while isinstance(c, CallLater):
c = c.cb
if isinstance(c, CallbackSequence):
c = c.__call__
if hasattr(c, 'im_func'):
code = c.im_func.func_code
nargs = code.co_argcount - 1
codeflags = code.co_flags
elif hasattr(c, 'func_code'):
code = c.func_code
nargs = code.co_argcount
codeflags = code.co_flags
else:
code = None
codeflags = 0
nargs = len(args)
hasargs = codeflags & CO_VARARGS
haskwargs = codeflags & CO_VARKEYWORDS
if haskwargs:
args = []
msg = 'callany given a kwarg function (%r): no arguments will be passed!' % funcinfo(c)
warnings.warn(msg)
if getattr(sys, 'DEV', False):
raise AssertionError(msg)
if not hasargs:
args = list(args)[:nargs]
args += [
None] * (nargs - len(args))
return func(*args)
def pythonize(s, lower = True):
if not isinstance(s, basestring):
raise TypeError, 'Only string/unicode types can be pythonized!'
allowed = string.letters + string.digits + '_'
s = str(s).strip()
if s.startswith('__') and s.endswith('__'):
s = s[2:-2]
s = None if s[0] in string.digits else '' + s
s = None if keyword.iskeyword(s) else '' + s
new_s = ''
for ch in s:
None += new_s if ch in allowed else '_'
if lower:
new_s = new_s.lower()
return new_s
attached_functions = { }
def dyn_dispatch(obj, func_name, *args, **kwargs):
func_name = pythonize(str(func_name))
if not hasattr(obj, func_name):
fn = sys._getframe(1).f_code.co_filename
d = dict(name = func_name)
d.update(kwargs)
code = str(obj.func_templ % d)
f = open(fn, 'a')
f.write(code)
f.close()
newcode = ''
code = code.replace('\n ', '\n')
exec code
attach_method(obj, locals()[func_name])
l = attached_functions.setdefault(obj.__class__, [])
l.append(func_name)
if func_name in attached_functions.setdefault(obj.__class__, []):
args = [
obj] + list(args)
return getattr(obj, func_name)(*args, **kwargs)
class CallTemplate(string.Template):
def __init__(self, templ):
string.Template.__init__(self, templ)
self.placeholders = [ m[1] for m in re.findall(self.pattern, self.template) ]
def __call__(self, *args, **kws):
return self.substitute(**primitives.dictadd(zip(self.placeholders, args), kws))
_profilers_enabled = False
def set_profilers_enabled(val):
global _profilers_enabled
_profilers_enabled = val
def use_profiler(target, callable):
target.profiler = EnableDisableProfiler()
def cb():
if not _profilers_enabled:
target.profiler.disable()
callable()
if sys.platform == 'win32':
SEHGuard = SEHGuard
import wx
else:
SEHGuard = lambda c: c()
(None, target.profiler.runcall)((lambda : SEHGuard(cb)))
def all_profilers():
return dict((lambda .0: for thread in .0:
if hasattr(thread, 'profiler'):
(thread, thread.profiler)continue)(threading.enumerate()))
def get_profile_report(profiler):
Stats = Stats
import pstats
StringIO = StringIO
import cStringIO
io = StringIO()
stats = Stats(profiler, stream = io)
io.write('\nby cumulative time:\n\n')
stats.sort_stats('cumulative').print_stats(25)
io.write('\nby number of calls:\n\n')
stats.sort_stats('time').print_stats(25)
return io.getvalue()
from cProfile import Profile
Profile.report = get_profile_report
def profilereport():
s = []
for thread, profiler in all_profilers().iteritems():
s.extend([
repr(thread),
profiler.report()])
return '\n'.join(s)
class Memoize(object):
__slots__ = [
'func',
'cache']
def __init__(self, func):
self.func = func
self.cache = { }
def __repr__(self):
return '<Memoize for %r (%d items)>' % (funcinfo(self.func), len(self.cache))
def __call__(self, *args, **kwargs):
key = (args, tuple(kwargs.items()))
cache = self.cache
try:
return cache[key]
except KeyError:
return cache.setdefault(key, self.func(*args, **kwargs))
memoize = Memoize
memoizedprop = lambda getter: property(memoize(getter))
def print_timing(num_runs = 1):
def wrapper1(func):
def wrapper(*arg, **kwargs):
t1 = time.clock()
for i in range(num_runs):
res = func(*arg, **kwargs)
t2 = time.clock()
print '%s took %0.3fms.' % (func.func_name, (t2 - t1) * 1000 / float(num_runs))
return res
return wrapper
return wrapper1
class i(int):
def __iter__(self):
return iter(xrange(self))
def baseclasses(cls):
if not hasattr(cls, '__bases__'):
raise TypeError('%r does not have __bases__' % cls)
bases = []
for c in cls.__bases__:
bases.append(c)
bases.extend(baseclasses(c))
return bases
def reload_(obj):
for klass in reversed(obj.__class__.__mro__):
if klass not in __builtins__:
reload(sys.modules[klass.__module__])
continue
return reload2_(obj)
def reload2_(obj):
m = sys.modules[obj.__class__.__module__]
m = reload(m)
cl = getattr(m, obj.__class__.__name__)
obj.__class__ = cl
return sys.modules[obj.__class__.__module__]
def bitflags_enabled(map, flags):
bits = _[1]
return _[2]
def import_module(modulePath):
try:
aMod = sys.modules[modulePath]
if not isinstance(aMod, types.ModuleType):
raise KeyError
except KeyError:
aMod = __import__(modulePath, globals(), locals(), [
''])
sys.modules[modulePath] = aMod
return aMod
def import_function(fullFuncName):
if not isinstance(fullFuncName, basestring):
raise TypeError('import_function needs a string, you gave a %s' % type(fullFuncName))
lastDot = fullFuncName.rfind(u'.')
funcName = fullFuncName[lastDot + 1:]
modPath = fullFuncName[:lastDot]
aMod = import_module(modPath)
aFunc = getattr(aMod, funcName)
return aFunc
def base_classes(clazz):
classes = []
for cl in clazz.__bases__:
classes += [
cl] + base_classes(cl)
return list(set(classes))
base_classes = memoize(base_classes)
def wrapfunc(obj, name, processor, avoid_doublewrap = True):
call = getattr(obj, name)
if avoid_doublewrap and getattr(call, 'processor', None) is processor:
return None
original_callable = getattr(call, 'im_func', call)
def wrappedfunc(*args, **kwargs):
return processor(original_callable, *args, **kwargs)
wrappedfunc.original = call
wrappedfunc.processor = processor
wrappedfunc.__name__ = getattr(call, '__name__', name)
if inspect.isclass(obj):
if hasattr(call, 'im_self'):
if call.im_self:
wrappedfunc = classmethod(wrappedfunc)
else:
wrappedfunc = staticmethod(wrappedfunc)
setattr(obj, name, wrappedfunc)
def unwrapfunc(obj, name):
setattr(obj, name, getattr(obj, name).original)
def tracing_processor(original_callable, *args, **kwargs):
r_name = getattr(original_callable, '__name__', '<unknown>')
r_args = [ (primitives.try_this,)((lambda : repr(a)), '<%s at %s>' % (type(a), id(a))) for None in args ]
[]([ '%s-%r' % x for x in kwargs.iteritems() ])
print '-> %s(%s)' % (r_name, ', '.join(r_args))
return original_callable(*args, **kwargs)
def add_tracing(class_object, method_name):
wrapfunc(class_object, method_name, tracing_processor)
def trace(clz):
for meth, v in inspect.getmembers(clz, inspect.ismethod):
if not meth.startswith('__'):
add_tracing(clz, meth)
continue
def typecounts(contains = None, objs = None):
import gc
if objs is None:
objs = gc.get_objects()
counts = defaultdict(int)
for obj in objs:
counts[type(obj).__name__] += 1
if contains is not None:
contains = lambda s, ss = contains: s[0].find(ss) != -1
return filter(contains, sorted(counts.iteritems(), key = (lambda a: a[1]), reverse = True))
def funcinfo(func):
if not hasattr(func, 'func_code'):
return repr(func)
name = getattr(func, '__name__', getattr(getattr(func, '__class__', None), '__name__', '<UNKNOWN OBJECT>'))
c = func.func_code
filename = c.co_filename
if not isinstance(filename, str):
filename = '??'
else:
try:
filepath = path(c.co_filename)
if filepath.name == '__init__.py':
filename = filepath.parent.name + '/' + filepath.name
else:
filename = filepath.name
except Exception:
print_exc()
return '<%s (%s:%s)>' % (name, filename, c.co_firstlineno)
def leakfinder():
import wx
pprint = pprint
import pprint
typecounts = typecounts
import util
import gc
f = wx.Frame(None, pos = (30, 30), style = wx.DEFAULT_FRAME_STYLE | wx.FRAME_TOOL_WINDOW | wx.STAY_ON_TOP)
b = wx.Button(f, -1, 'memory stats')
b2 = wx.Button(f, -1, 'all functions')
b3 = wx.Button(f, -1, 'all unnamed lambdas')
sz = f.Sizer = wx.BoxSizer(wx.VERTICAL)
sz.AddMany([
b,
b2,
b3])
f.stats = { }
def onstats(e):
new = typecounts()
news = dict(new)
for cname in news.keys():
if cname in f.stats:
diff = news[cname] - f.stats[cname][0]
f.stats[cname] = (news[cname], diff)
continue
f.stats[cname] = (news[cname], 0)
print '****' * 10
pprint(sorted(f.stats.iteritems(), key = (lambda a: a[1])))
def on2(e, filterName = ((None, None, None), None)):
funcs = _[1]
counts = defaultdict(int)
for f in funcs:
pass
print '((Filename, Line Number), Count)'
pprint(sorted(list(counts.iteritems()), key = (lambda i: i[1])))
b.Bind(wx.EVT_BUTTON, onstats)
b2.Bind(wx.EVT_BUTTON, on2)
b3.Bind((wx.EVT_BUTTON,), (lambda e: on2(e, '<lambda>')))
f.Sizer.Layout()
f.Fit()
f.Show()
def counts(seq, groupby):
counts = defaultdict(int)
for obj in seq:
counts[groupby(obj)] += 1
return sorted((lambda .0: for val, count in .0:
(count, val))(counts.iteritems()), reverse = True)
class InstanceTracker(object):
def track(self):
try:
_instances = self.__class__._instances
except AttributeError:
self.__class__._instances = [
ref(self)]
for wref in _instances:
if wref() is self:
break
continue
def all(cls):
objs = []
try:
wrefs = cls._instances
except AttributeError:
return []
import wx
for wref in wrefs[:]:
obj = wref()
if obj is not None:
if wx.IsDestroyed(obj):
wrefs.remove(wref)
else:
objs.append(obj)
wx.IsDestroyed(obj)
return objs
all = classmethod(all)
def CallAll(cls, func, *args, **kwargs):
import wx
try:
instances = cls._instances
except AttributeError:
return None
removeList = []
for wref in instances:
obj = wref()
if obj is not None and not wx.IsDestroyed(obj):
try:
func(obj, *args, **kwargs)
except TypeError:
print type(obj), repr(obj)
raise
except:
None<EXCEPTION MATCH>TypeError
None<EXCEPTION MATCH>TypeError
removeList.append(wref)
for wref in removeList:
try:
instances.remove(wref)
continue
except ValueError:
continue
CallAll = classmethod(CallAll)
class DeadObjectError(AttributeError):
pass
class DeadObject(object):
reprStr = 'Placeholder for DELETED %s object! Please unhook all callbacks, observers, and event handlers PROPERLY.'
attrStr = 'Attribute access no longer allowed - This object has signaled that it is no longer valid!'
def __repr__(self):
if not hasattr(self, '_name'):
self._name = '[unknown]'
return self.reprStr % self._name
def __getattr__(self, *args):
if not hasattr(self, '_name'):
self._name = '[unknown]'
raise DeadObjectError(self.attrStr % self._name)
def __nonzero__(self):
return 0
def gc_diagnostics(stream = None):
import gc
import sys
import linecache as linecache
import locale as locale
itemgetter = itemgetter
import operator
ifilterfalse = ifilterfalse
imap = imap
import itertools
getrefcount = sys.getrefcount
if stream is None:
stream = sys.stdout
linecache.clearcache()
gc.collect()
def w(s):
stream.write(s + '\n')
filter_objects = ((), '')
filter_types = (type,)
objs = _[1]
itemgetter0 = itemgetter(0)
itemgetter1 = itemgetter(1)
objs.sort(key = itemgetter0)
num_objs = len(objs)
w('%d objects' % num_objs)
N = 600
notallowed = (basestring,)
import __builtin__ as __builtin__
blacklist = set()
oldlen = 0
modlen = len(sys.modules)
while modlen != oldlen:
blacklist |= set((lambda .0: for m in .0:
if m:
id(m.__dict__)continue)(sys.modules.itervalues()))
for m in sys.modules.values():
if m and hasattr(m, '__docfilter__'):
blacklist.add(id(m.__docfilter__._globals))
continue
[]
oldlen = modlen
modlen = len(sys.modules)
continue
[]
blacklist.add(id(__builtin__))
blacklist.add(id(__builtin__.__dict__))
blacklist.add(id(sys.modules))
blacklist.add(id(locale.locale_alias))
if sys.modules.get('stringprep', None) is not None:
import stringprep
blacklist.add(id(stringprep.b3_exceptions))
blacklist.add(id(objs))
def blacklisted(z):
if not isinstance(z, notallowed) and id(z) in blacklist:
pass
return z is blacklist
def blacklisted_1(z):
z = z[-1]
return blacklisted(z)
def large_sequence(z):
try:
if len(z) > 300:
pass
return not blacklisted(z)
except:
pass
def saferepr(obj):
try:
return repr(obj)
except Exception:
e = None
try:
return '<%s>' % type(obj).__name__
return '<??>'
num_most_reffed = min(int(num_objs * 0.05), 20)
most_reffed = ifilterfalse(blacklisted_1, reversed(objs))
w('\n\n')
w('*** top %d referenced objects' % num_most_reffed)
w('sys.getrefcount(obj), repr(obj)[:1000]')
for nil in xrange(num_most_reffed):
try:
(rcount, obj) = most_reffed.next()
except StopIteration:
(((None, None),),)
(((None, None),),)
break
except:
(((None, None),),)
w('%d %d: %s' % (rcount, id(obj), saferepr(obj)[:1000]))
w('\n\n')
w('*** objects with __len__ more than %d' % N)
w('__len__(obj), repr(obj)[:1000]')
large_objs = [](_[2], key = itemgetter0, reverse = True)
for count, _id, s in large_objs:
if _id != id(objs):
w('count %d id %d: %s' % (count, _id, s))
continue
[]
w('\n\n')
_typecounts = typecounts(objs = imap(itemgetter1, objs))
num_types = 20
w('*** top %d instantiated types' % num_types)
builtin_names = set(__builtins__.keys())
tc_iter = (sorted, ifilterfalse)((lambda _x: builtin_names.__contains__(itemgetter0(_x))), _typecounts)
for nil in range(num_types):
try:
(tname, tcount) = tc_iter.next()
except StopIteration:
(((None, None),),)
(((None, None),),)
break
except:
(((None, None),),)
w('%d: %r' % (tcount, tname))
funcinfos = defaultdict(int)
for refcount, obj in objs:
if callable(obj):
try:
finfo = funcinfo(obj)
except:
(((None, None),),)
continue
funcinfos[finfo] += refcount
continue
(((None, None),),)
num_infos = min(len(funcinfos), 20)
funcinfos = funcinfos.items()
funcinfos.sort(key = itemgetter1, reverse = True)
w('\n\n')
w('*** %d most referenced callables' % num_infos)
for i in range(num_infos):
(finfo, frcount) = funcinfos[i]
w('%d: %r' % (frcount, finfo))
try:
import wx
except ImportError:
pass
w('\n\n*** top level windows')
for tlw in wx.GetTopLevelWindows():
w(saferepr(tlw))
w('\n\n*** gc.garbage')
if not gc.garbage:
w('(none)')
else:
for obj in gc.garbage:
w(saferepr(obj))
import cProfile
import _lsprof
class EnableDisableProfiler(cProfile.Profile):
def __init__(self, *a, **k):
self.enabled = False
_lsprof.Profiler.__init__(self, *a, **k)
def enable(self):
self.enabled = True
return _lsprof.Profiler.enable(self)
def disable(self):
self.enabled = False
return _lsprof.Profiler.disable(self)
if __name__ == '__main__':
from pprint import pprint
pprint(typecounts('Graphics'))